package com.gitee.kamismile.stoneEureka.server.handler;

import com.gitee.kamismile.gatewayAgent.common.APPChannel;
import com.gitee.kamismile.gatewayAgent.protobuf.Message;
import com.gitee.kamismile.gatewayAgent.server.handler.GateWayServerHandler;
import com.gitee.kamismile.stone.commmon.exception.BusinessException;
import com.gitee.kamismile.stone.commmon.util.JsonUtil;
import com.gitee.kamismile.stone.commmon.util.ValueUtils;
import com.google.protobuf.ByteString;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import com.gitee.kamismile.stoneEureka.server.service.IEurekaService;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
import org.springframework.context.ApplicationContext;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

import java.util.*;

public class EurekaServerHandler extends GateWayServerHandler {

    protected final Logger logger = LoggerFactory.getLogger("bLog");

    private ApplicationContext applicationContext;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Message.RouterMessage msgRequest) throws Exception {
        List<Message.RouterRequest> requestList = msgRequest.getRequestList();
        if (CollectionUtils.isEmpty(requestList)) {
            return;
        }

        String clientId = getClientIdString(ctx);
        String remoteAddress = ctx.channel().remoteAddress().toString();

        Flux.defer(() -> {

            Message.RouterMessage.Builder msg = Message.RouterMessage.newBuilder();
            return Flux.fromIterable(requestList)
                    .flatMap(request -> {
                        return Flux.defer(() -> {

                            try {
                                this.setReqCmd(request.getCmd().getCommand());
                                this.setSeq(request.getSeq());
                                String data = new String(request.getSerializedData().toByteArray(), "utf-8");

                                Map<String, Object> dataJson = ValueUtils.isNull(data) ? new HashMap<String, Object>() : JsonUtil.json2Map(data);
                                dataJson.put("cmd", this.getReqCmd());
                                dataJson.put("clientId", clientId);
                                dataJson.put("remoteAddress", remoteAddress);

                                if (this.getReqCmd().equals("app@register@version")||this.getReqCmd().equals("app@vertx@version")) {
                                    String token = ValueUtils.isStringNull(dataJson.get("token"));
                                    if (StringUtils.isEmpty(token)) {
                                        throw new BusinessException("error");
                                    }

                                    Channel channel = ctx.channel();
                                    ChannelId channelId = channel.id();
                                    APPChannel.channelTokens.put(channelId.asLongText(), token);
                                    APPChannel.tokenChannels.put(token, channelId.asLongText());
                                    APPChannel.channels.add(channel);
                                }

                                Collection<IEurekaService> eurekaServices = applicationContext.getBeansOfType(IEurekaService.class).values();
                                Optional<Map<String, Object>> mapOptional = eurekaServices.stream().filter(e -> e.isCmd(this.getReqCmd())).map(e -> e.doService(dataJson)).findFirst();

                                Message.RouterResponse.Builder resp = Message.RouterResponse.newBuilder();

                                resp.setSeq(this.getSeq());
                                resp.setRouter(ValueUtils.isStringNull(getGatewayAgentConstant().getRouter()));

                                Message.Command.Builder cmd = Message.Command.newBuilder();
                                cmd.setCommand(this.getReqCmd());
                                resp.setCmd(cmd);

                                Message.Result.Builder result = Message.Result.newBuilder();
                                result.setCode(0);
                                Map<String, Object> json = new HashMap<String, Object>();
                                json.put("data", "{\"data\":\"ok\"}");

                                if (mapOptional.isPresent()) {
                                    Map<String, Object> map = mapOptional.get();
                                    json.put("data", JsonUtil.toJson(map));
                                }

                                resp.setResult(result);
                                resp.setSerializedData(ByteString.copyFrom(json.get("data").toString().getBytes("utf-8")));
                                return Flux.just(resp)
                                        .subscribeOn(Schedulers.boundedElastic()) ;
                            } catch (Exception ex) {
                                logger.error(ExceptionUtils.getFullStackTrace(ex));
                                throw new BusinessException(-1, ex.getMessage());
                            }
                        })     .subscribeOn(Schedulers.boundedElastic()) ;
                    }).subscribeOn(Schedulers.boundedElastic())
                    .collectList()
                    .doOnSuccess((v) -> {
                        v.stream().forEach(res -> msg.addResponse(res));
                        ctx.writeAndFlush(msg);
                    });
        }).subscribeOn(Schedulers.boundedElastic())
                .subscribe();
    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        APPChannel.channels.remove(ctx.channel());
        String token = APPChannel.channelTokens.remove(ctx.channel().id().asLongText());
        if (org.apache.commons.lang3.StringUtils.isNotBlank(token)) {
            applicationContext.getBeansOfType(IEurekaService.class).values().forEach(v->v.delete(token));
            APPChannel.tokenChannels.remove(token);
        }
        super.exceptionCaught(ctx, cause);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        APPChannel.channels.remove(ctx.channel());
        String token = APPChannel.channelTokens.remove(ctx.channel().id().asLongText());
        if (org.apache.commons.lang3.StringUtils.isNotBlank(token)) {
            applicationContext.getBeansOfType(IEurekaService.class).values().forEach(v->v.delete(token));
            APPChannel.tokenChannels.remove(token);
        }
        super.channelInactive(ctx);
        ctx.close();
    }

    public ApplicationContext getApplicationContext() {
        return applicationContext;
    }

    public void setApplicationContext(ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
    }

    private String getClientIdString(ChannelHandlerContext ctx) {
        ChannelId channelId = ctx.channel().id();
        String clientId = channelId.asLongText();
        return clientId;
    }
}
